89033dde5000158a79d498d7ae091a946bccce89,bridge/src/main/java/io/rhiot/kafka/bridge/SinkBridgeEndpoint.java,SinkBridgeEndpoint,handle,#ProtonLink#,123
Before Change
props.put(BridgeConfig.AUTO_OFFSET_RESET, "earliest");
// create and start new thread for reading from Kafka
this.kafkaConsumerRunner = new KafkaConsumerRunner<>(props,
topic, (Integer)partition, (Long)offset,
this.vertx, this.ebName,
sender.getQoS(), this.offsetTracker);
this.kafkaConsumerThread = new Thread(kafkaConsumerRunner);
this.kafkaConsumerThread.start();
After Change
config.put(KafkaConsumerWorker.KAFKA_CONSUMER_TOPIC, topic);
config.put(KafkaConsumerWorker.KAFKA_CONSUMER_PARTITION, (Integer)partition);
config.put(KafkaConsumerWorker.KAFKA_CONSUMER_OFFSET, (Long)offset);
config.put(KafkaConsumerWorker.KAFKA_CONSUMER_EBQUEUE, this.ebName);
config.put(KafkaConsumerWorker.KAFKA_CONSUMER_QOS, sender.getQoS());
this.kafkaConsumerWorker = new KafkaConsumerWorker<>();
this.kafkaConsumerWorker.setOffsetTracker(this.offsetTracker);
DeploymentOptions options = new DeploymentOptions().setConfig(config);
this.vertx.deployVerticle(this.kafkaConsumerWorker, options);
// message sending on AMQP link MUST happen on Vert.x event loop due to
// the access to the sender object provided by Vert.x handler